package kafka.admin;

import kafka.admin.ConsumerGroupCommand;
import scala.Option;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Seq;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

public class KafkaConsumerService implements ConsumerService, Closeable {

    private String bootstrapServers;
    private ConsumerGroupCommand.KafkaConsumerGroupService consumerGroupService;

    public KafkaConsumerService(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
        consumerGroupService = getKafkaConsumerGroupService();
    }

    @Override
    public List<String> listConsumerGroups() {
        return JavaConverters.seqAsJavaList(consumerGroupService.listGroups());
    }

    private ConsumerGroupCommand.KafkaConsumerGroupService getKafkaConsumerGroupService() {
        String[] options = new String[]{
                "--bootstrap-server", bootstrapServers,
        };
        ConsumerGroupCommand.ConsumerGroupCommandOptions consumerGroupCommandOptions = new ConsumerGroupCommand.ConsumerGroupCommandOptions(options);
        return new ConsumerGroupCommand.KafkaConsumerGroupService(consumerGroupCommandOptions);
    }

    @Override
    public void close() throws IOException {
        if (this.consumerGroupService != null) {
            this.consumerGroupService.close();
        }
    }

    public List<PartitionAssignmentState> collectGroupAssignment(String group) {
        Tuple2<Option<String>, Option<Seq<ConsumerGroupCommand.PartitionAssignmentState>>> optionOptionTuple2 = consumerGroupService.collectGroupAssignment(group);
        Seq<ConsumerGroupCommand.PartitionAssignmentState> partitionAssignmentStateSeq = optionOptionTuple2._2().get();
        return JavaConversions.seqAsJavaList(partitionAssignmentStateSeq).stream().map(this::toPartitionAssignmentState).collect(Collectors.toList());
    }

    private PartitionAssignmentState toPartitionAssignmentState(ConsumerGroupCommand.PartitionAssignmentState partitionAssignmentState) {

        return PartitionAssignmentState.builder()
                .group(partitionAssignmentState.group())
                .clientId(partitionAssignmentState.clientId().get())
                .consumerId(partitionAssignmentState.consumerId().get())
                .coordinator(partitionAssignmentState.coordinator().get())
                .topic(partitionAssignmentState.topic().get())
                .partition((Long) partitionAssignmentState.partition().get())
                .build();

    }
}
